第14章 RAG架构与原理:基于LangChain构建对话知识库
学习目标
- 理解检索增强生成(RAG)的核心概念与架构
- 掌握使用LangChain框架实现RAG系统的方法
- 学习处理多种文档格式(PDF、Word等)并构建向量数据库
- 实现一个完整的对话知识库系统
什么是检索增强生成(RAG)?
检索增强生成(RAG)是一种结合检索系统与生成式AI的架构,通过从外部知识库检索相关信息来增强大语言模型的响应能力,特别适合构建专业领域的对话知识库系统。
RAG的核心流程
- 文档处理:加载并处理多种格式文档
- 文档分割:将文档切分为适当大小的片段
- 向量化:将文本片段转换为向量表示
- 知识检索:根据用户查询检索相关文档
- 上下文增强:将检索内容与用户查询结合
- 生成回答:利用增强上下文生成最终回答
使用LangChain实现RAG系统
LangChain提供了构建RAG系统所需的全套组件和工具链,使实现过程变得简单高效。
1. 文档加载与处理
LangChain提供多种文档加载器,支持各类文件格式:
python
from langchain.document_loaders import PyPDFLoader, TextLoader, Docx2txtLoader, CSVLoader
# 加载PDF文档
pdf_loader = PyPDFLoader("data/document.pdf")
pdf_docs = pdf_loader.load()
# 加载Word文档
docx_loader = Docx2txtLoader("data/document.docx")
docx_docs = docx_loader.load()
# 加载文本文件
text_loader = TextLoader("data/document.txt")
text_docs = text_loader.load()
# 加载CSV数据
csv_loader = CSVLoader("data/data.csv")
csv_docs = csv_loader.load()
# 合并所有文档
all_docs = pdf_docs + docx_docs + text_docs + csv_docs
2. 文档分割策略
将长文档切分为适合向量化的片段:
python
from langchain.text_splitter import RecursiveCharacterTextSplitter, CharacterTextSplitter
# 递归文本分割(推荐)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 每个块的字符数
chunk_overlap=200, # 相邻块之间的重叠字符数
separators=["\n\n", "\n", " ", ""], # 优先按段落、句子分割
length_function=len,
)
# 分割文档
chunks = text_splitter.split_documents(all_docs)
3. 文本向量化
使用本地嵌入模型将文本块转换为向量表示:
python
from langchain.embeddings import HuggingFaceEmbeddings
# 使用本地BGE嵌入模型(适合中文场景)
embeddings = HuggingFaceEmbeddings(
model_name="./bge-large-zh-v1.5", # 指向本地模型目录
model_kwargs={'device': 'cpu'}, # 使用CPU运行,对于GPU可设置为'cuda'
encode_kwargs={'normalize_embeddings': True} # 归一化嵌入向量以提高检索质量
)
# 或使用本地GTE嵌入模型(适合多语言场景)
# embeddings = HuggingFaceEmbeddings(
# model_name="./gte-large", # 指向本地模型目录
# model_kwargs={'device': 'cpu'},
# encode_kwargs={'normalize_embeddings': True}
# )
4. 向量数据库
使用Chroma向量数据库存储文档向量:
python
from langchain_chroma import Chroma
# 使用Chroma向量数据库
vectorstore = Chroma.from_documents(
documents=chunks,
embedding_function=embeddings, # 注意参数名为embedding_function
persist_directory="chroma_db" # 持久化存储目录
)
# 持久化保存数据库
vectorstore.persist()
# 后续可以重新加载已有数据库
vectorstore = Chroma(
persist_directory="chroma_db",
embedding_function=embeddings
)
5. 检索查询器
构建检索查询器,支持多种检索策略:
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import OpenAI
# 基本检索(相似度搜索)
retriever = vectorstore.as_retriever(
search_type="similarity", # 支持 similarity, mmr
search_kwargs={"k": 5} # 返回前k个结果
)
# 高级检索(内容压缩)- 提取最相关的内容片段
llm = OpenAI(temperature=0)
compressor = LLMChainExtractor.from_llm(llm)
compression_retriever = ContextualCompressionRetriever(
base_retriever=retriever,
doc_compressor=compressor
)
6. 构建RAG链
将上述组件整合成一个完整的RAG系统:
python
from langchain.chains import RetrievalQA, ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain_openai import ChatOpenAI
# 初始化LLM
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)
# 简单问答链
qa_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff", # 支持stuff, map_reduce, refine, map_rerank
retriever=retriever,
return_source_documents=True # 返回来源文档
)
# 对话记忆
memory = ConversationBufferMemory(
memory_key="chat_history",
return_messages=True
)
# 对话型检索链
conversation_chain = ConversationalRetrievalChain.from_llm(
llm=llm,
retriever=retriever,
memory=memory,
return_source_documents=True
)
7. 构建简单交互界面
使用Streamlit快速构建对话知识库界面:
python
import streamlit as st
st.title("智能文档问答系统")
# 初始化会话状态
if "messages" not in st.session_state:
st.session_state.messages = []
# 显示对话历史
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 接收用户输入
if prompt := st.chat_input("请输入您的问题"):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# 生成回答
with st.chat_message("assistant"):
with st.spinner("思考中..."):
result = conversation_chain({"question": prompt})
response = result["answer"]
sources = result["source_documents"]
# 显示回答
st.markdown(response)
# 显示来源文档
with st.expander("查看来源"):
for i, doc in enumerate(sources):
st.markdown(f"**来源 {i+1}**")
st.markdown(doc.page_content)
st.markdown(f"*来自: {doc.metadata.get('source', '未知来源')}*")
st.session_state.messages.append({"role": "assistant", "content": response})
完整RAG系统实现示例
下面是一个完整的RAG系统实现,包括文档处理、向量存储和对话接口:
python
import os
from langchain.document_loaders import PyPDFLoader, Docx2txtLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_chroma import Chroma
from langchain_openai import ChatOpenAI
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain.prompts import PromptTemplate
# 配置模型路径和API密钥
os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
class DocumentAssistant:
def __init__(self, docs_dir="./documents", model_dir="./bge-large-zh-v1.5"):
self.docs_dir = docs_dir
self.model_dir = model_dir
self.documents = []
self.chunks = []
self.vectorstore = None
self.conversation_chain = None
def load_documents(self):
"""加载多种格式的文档"""
for file in os.listdir(self.docs_dir):
file_path = os.path.join(self.docs_dir, file)
if file.endswith('.pdf'):
loader = PyPDFLoader(file_path)
self.documents.extend(loader.load())
elif file.endswith('.docx'):
loader = Docx2txtLoader(file_path)
self.documents.extend(loader.load())
elif file.endswith('.txt'):
loader = TextLoader(file_path)
self.documents.extend(loader.load())
print(f"已加载 {len(self.documents)} 个文档")
def process_documents(self):
"""处理文档:分割和向量化"""
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
separators=["\n\n", "\n", "。", "!", "?", ".", " ", ""],
)
self.chunks = text_splitter.split_documents(self.documents)
print(f"文档已分割为 {len(self.chunks)} 个块")
# 创建向量存储
embeddings = HuggingFaceEmbeddings(
model_name=self.model_dir,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
self.vectorstore = Chroma.from_documents(
documents=self.chunks,
embedding_function=embeddings,
persist_directory="chroma_document_db"
)
# 保存向量库
self.vectorstore.persist()
print("向量存储已创建并保存")
def load_vectorstore(self):
"""加载已有向量库"""
if os.path.exists("chroma_document_db"):
embeddings = HuggingFaceEmbeddings(
model_name=self.model_dir,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
self.vectorstore = Chroma(
persist_directory="chroma_document_db",
embedding_function=embeddings
)
print("已加载向量存储")
return True
return False
def setup_qa_chain(self):
"""设置问答链"""
# 自定义提示模板
template = """
你是一个专业的知识助手,使用提供的上下文信息来回答用户的问题。
如果你不知道答案,就说你不知道,不要试图编造答案。
尽量使用中文回答问题,除非用户使用其他语言提问。
使用上下文中的信息来提供准确、有帮助的回答。
上下文信息:
{context}
对话历史:
{chat_history}
用户问题: {question}
"""
QA_PROMPT = PromptTemplate(
input_variables=["context", "chat_history", "question"],
template=template
)
# 设置对话记忆
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# 创建对话链
self.conversation_chain = ConversationalRetrievalChain.from_llm(
llm=ChatOpenAI(temperature=0, model_name="gpt-3.5-turbo"),
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 4}),
memory=memory,
combine_docs_chain_kwargs={"prompt": QA_PROMPT},
return_source_documents=True
)
print("问答链已设置完成")
def ask(self, question):
"""向系统提问"""
if not self.conversation_chain:
return "系统未初始化,请先加载文档"
result = self.conversation_chain({"question": question})
return {
"answer": result["answer"],
"sources": [{"content": doc.page_content, "source": doc.metadata.get("source", "未知")}
for doc in result.get("source_documents", [])]
}
# 使用示例
if __name__ == "__main__":
# 可以指定本地模型路径,默认使用 ./bge-large-zh-v1.5
assistant = DocumentAssistant(model_dir="./bge-large-zh-v1.5")
# 如果要使用GTE模型可以替换为 model_dir="./gte-large"
# 如果向量库已存在,直接加载
if not assistant.load_vectorstore():
# 否则重新处理文档
assistant.load_documents()
assistant.process_documents()
# 设置问答链
assistant.setup_qa_chain()
# 交互式问答
while True:
question = input("\n请输入问题 (输入'退出'结束): ")
if question.lower() in ['退出', 'exit', 'quit']:
break
response = assistant.ask(question)
print("\n回答:", response["answer"])
if len(response["sources"]) > 0:
print("\n来源文档:")
for i, source in enumerate(response["sources"]):
print(f"[{i+1}] {source['source']}: {source['content'][:150]}...")
RAG系统优化策略
1. 文档处理优化
- 智能分割:根据文档结构(标题、段落)进行语义分割
- 元数据丰富:为每个文档块添加来源、时间等元数据
- 内容过滤:去除无意义内容,如页眉页脚、广告等
python
# 添加元数据的例子
for i, doc in enumerate(documents):
doc.metadata["source"] = f"document_{i}.pdf"
doc.metadata["date"] = "2023-10-15" # 文档日期
doc.metadata["category"] = "finance" # 文档类别
2. 检索优化
- 混合检索:结合关键词检索和向量检索
- 查询扩展:通过LLM扩展原始查询,提高召回率
- 结果重排序:根据相关性对检索结果进行重新排序
python
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import EmbeddingsFilter
# 语义过滤器 - 提高精确性
embeddings_filter = EmbeddingsFilter(
embeddings=embeddings,
similarity_threshold=0.76 # 只保留相似度超过阈值的文档
)
# 压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_retriever=vectorstore.as_retriever(search_kwargs={"k": 10}),
doc_compressor=embeddings_filter
)
3. 生成优化
- 结构化输出:使用输出解析器获取结构化回答
- 引用增强:在回答中添加引用标记,指向来源文档
- 一致性检查:确保生成内容与检索文档一致
python
from langchain.output_parsers import StructuredOutputParser, ResponseSchema
# 定义输出格式
response_schemas = [
ResponseSchema(name="answer", description="对问题的回答"),
ResponseSchema(name="sources", description="支持回答的信息来源"),
ResponseSchema(name="confidence", description="对回答确信度的估计(0-100)")
]
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)
# 在提示中使用格式说明
format_instructions = output_parser.get_format_instructions()
prompt_template = """
根据以下上下文回答问题,并按照指定格式输出:
{format_instructions}
上下文: {context}
问题: {question}
"""
常见RAG应用场景与实现思路
1. 企业内部知识库
连接公司文档、Wiki、产品手册等,构建内部智能助手:
python
# 企业知识库专用提示模板
enterprise_template = """
你是[公司名]的内部知识助手。你的回答必须:
1. 只使用提供的上下文信息回答问题
2. 引用确切的公司政策和规程
3. 如果不确定,建议用户联系相关部门
4. 保持专业、简洁的语气
上下文: {context}
问题: {question}
"""
2. 个性化学习助手
结合课程材料、教科书等,创建个性化学习体验:
python
# 学习助手专用回答生成函数
def generate_learning_answer(question, contexts, llm):
# 1. 直接回答问题
answer = llm.predict(f"基于以下资料回答问题: {contexts} 问题: {question}")
# 2. 生成相关练习题
practice_questions = llm.predict(
f"基于这个问题和上下文,生成3个相关的练习题: {question} {contexts[:500]}"
)
# 3. 提供进一步学习资源
additional_resources = llm.predict(
f"推荐进一步学习的主题或资源,与此问题相关: {question}"
)
return {
"answer": answer,
"practice_questions": practice_questions,
"additional_resources": additional_resources
}
3. 技术文档助手
支持API文档、框架说明等技术内容查询:
python
# 代码示例生成
def generate_code_example(docs, question, language="python"):
# 找到相关API文档
relevant_docs = retrieve_relevant_docs(docs, question)
# 基于文档生成代码示例
prompt = f"""
根据以下API文档,使用{language}语言生成一个简洁的代码示例:
API文档: {relevant_docs}
需求: {question}
请仅返回代码,附带简短注释说明。
"""
return llm.predict(prompt)
思考题
在处理大型文档集时,如何平衡向量数据库的性能与检索质量?
对于专业领域的RAG系统,如何选择合适的文档分割策略和块大小?
在使用LangChain实现RAG系统时,如何有效地处理多语言文档?
设计一个能够处理表格、图片等多模态信息的RAG系统,需要考虑哪些额外因素?
如何评估和优化RAG系统的检索性能和生成质量?
接下来,我们将深入探讨如何构建和部署一个完整的RAG系统,并进行性能优化和评估。